Assignment 03

Author
Affiliation

Sabrina Minaya Vasquez

Boston University

Published

September 24, 2025

Modified

September 21, 2025

1 Import Packges

import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "svg"

from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go

from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

# Set random seed
np.random.seed(42)

# Change Plotly renderer for notebooks
pio.renderers.default = "notebook"

2 Load Dataset

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-Sabrina1211/data/lightcast_job_postings.csv")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#df.show(5)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 00:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]                                                                                

3 Data Preparation

# Step 1: Casting Salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
       .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
       .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
       .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
       .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))


# Step 2: Computing medians for salary columns

def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

# Step 4: Imputing missing salaries, but no experience
df = df.fillna({
     "SALARY_FROM": median_from,
     "SALARY_TO": median_to,
     "SALARY": median_salary
})

# Step 5: Computing Average Salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) /
2)

# Step 6: Selecting required columns
export_cols = [
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "SALARY",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]

df_selected = df.select(*export_cols)

# Step 7: Saving to csv
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)

print("Data cleaning complete. Rows retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                
Data cleaning complete. Rows retained: 72498

4 Salary Distribution by Industry and Employment Type

# Your Code for 1st question here

# Filter out missing or zero salary values
pdf = (
    df.select("EMPLOYMENT_TYPE_NAME", "SALARY")
      .withColumn("SALARY", col("SALARY").cast("double"))
      .filter(col("SALARY") > 0)
      .toPandas()
)

# Clean employment type names for better readability
pdf["EMPLOYMENT_TYPE_NAME"] = (
    pdf["EMPLOYMENT_TYPE_NAME"]
      .astype("string")           # ensures string ops work
      .fillna("Unknown")          # avoid NoneType
      .str.replace(r"[^\x00-\x7F]+", "", regex=True)
      .str.strip()
)

# Compute median salary for sorting
median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()

# Sort employment types based on median salary in descending order
sorted_employment_types = median_salaries.sort_values(ascending=False).index

# Apply sorted categories
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
    pdf["EMPLOYMENT_TYPE_NAME"],
    categories=sorted_employment_types,
    ordered=True
)

# Create box plot with horizontal grid lines
fig = px.box(
    pdf,
    x="EMPLOYMENT_TYPE_NAME",
    y="SALARY",
    title="Salary Distribution by Employment Type",
    color_discrete_sequence=["black"],  # Single neutral color
    boxmode="group",
    points="all",  # Show all outliers
)

# Improve layout, font styles, and axis labels
fig.update_layout(
    title=dict(
        text="Salary Distribution by Employment Type",
        font=dict(size=30, family="Arial", color="black", weight="bold"),  # Bigger & Bold Title
    ),
    xaxis=dict(
        title=dict(text="Employment Type", font=dict(size=24, family="Arial", color="black", weight="bold")),  # Bigger X-label
        tickangle=0,  # Rotate X-axis labels for readability
        tickfont=dict(size=18, family="Arial", color="black", weight="bold"),  # Bigger & Bold X-ticks
        showline=True,  # Show axis lines
        linewidth=2,  # Thicker axis lines
        linecolor="black",
        mirror=True,
        showgrid=False,  # Remove vertical grid lines
        categoryorder="array",
        categoryarray=sorted_employment_types.tolist(),
    ),
    yaxis=dict(
        title=dict(text="Salary (K $)", font=dict(size=24, family="Arial", color="black", weight="bold")),  # Bigger Y-label
        tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
        ticktext=["0", "50K", "100K", "150K", "200K", "250K", "300K", "350K", "400K", "450K", "500K"],
        tickfont=dict(size=18, family="Arial", color="black", weight="bold"),  # Bigger & Bold Y-ticks
        showline=True,
        linewidth=2,
        linecolor="black",
        mirror=True,
        showgrid=True,  # Enable light horizontal grid lines
        gridcolor="lightgray",  # Light shade for the horizontal grid
        gridwidth=0.5,  # Thin grid lines
    ),
    font=dict(family="Arial", size=16, color="black"),
    boxgap=0.7,
    plot_bgcolor="white",
    paper_bgcolor="white",
    showlegend=False,
    height=500,
    width=850,
)

# Show the figure
fig.show()
[Stage 6:>                                                          (0 + 1) / 1]